Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/output
/test/output
build/
.cache

# Ignore hidden files
.*
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ You can use it to:
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task (experimental)](docs/cn/bthread_active_task.md)
* Client
* [Basics](docs/en/client.md)
* [Error code](docs/en/error_code.md)
Expand Down
1 change: 1 addition & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task(实验性)](docs/cn/bthread_active_task.md)
* [bthread tracer](docs/cn/bthread_tracer.md)
* Client
* [基础功能](docs/cn/client.md)
Expand Down
379 changes: 379 additions & 0 deletions docs/cn/bthread_active_task.md

Large diffs are not rendered by default.

142 changes: 142 additions & 0 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
// Date: Tue Jul 10 17:40:58 CST 2012

#include <sys/syscall.h>
#include <limits>
#include <string.h>
#include <vector>
#include <gflags/gflags.h>
#include "butil/macros.h" // BAIDU_CASSERT
#include "butil/logging.h"
Expand Down Expand Up @@ -85,6 +88,8 @@ pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Notice that we can't declare the variable as atomic<TaskControl*> which
// are not constructed before main().
TaskControl* g_task_control = NULL;
static pthread_mutex_t g_active_task_registry_mutex = PTHREAD_MUTEX_INITIALIZER;
static std::vector<bthread_active_task_type_t> g_active_task_types;

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
Expand All @@ -96,6 +101,89 @@ inline TaskControl* get_task_control() {
return g_task_control;
}

static bool normalize_active_task_type(const bthread_active_task_type_t* in,
bthread_active_task_type_t* out) {
if (in == NULL || out == NULL) {
return false;
}
if (in->struct_size < sizeof(bthread_active_task_type_t)) {
return false;
}
if (in->name == NULL || in->name[0] == '\0') {
return false;
}
if (in->worker_init == NULL && in->worker_destroy == NULL &&
in->harvest == NULL) {
return false;
}
memset(out, 0, sizeof(*out));
memcpy(out, in, sizeof(*out));
out->struct_size = sizeof(*out);
return true;
}

void get_active_task_types_snapshot(std::vector<bthread_active_task_type_t>* out) {
if (out == NULL) {
return;
}
BAIDU_SCOPED_LOCK(g_active_task_registry_mutex);
*out = g_active_task_types;
}

static inline TaskMeta* current_normal_bthread_for_local_pin(int* err) {
TaskGroup* g = tls_task_group;
if (g == NULL || g->is_current_main_task() || g->is_current_pthread_task()) {
if (err) {
*err = EPERM;
}
return NULL;
}
if (err) {
*err = 0;
}
return g->current_task();
}

static inline int enter_local_pin_scope(TaskMeta* m, TaskGroup* g) {
if (m == NULL || g == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled) {
m->local_pin_home_group = g;
m->local_pin_home_control = g->control();
m->local_pin_home_tag = g->tag();
m->local_pin_depth = 1;
m->local_pin_enabled = true;
return 0;
}
if (m->local_pin_home_group != g ||
m->local_pin_home_control != g->control() ||
m->local_pin_home_tag != g->tag()) {
return EPERM;
}
if (m->local_pin_depth == std::numeric_limits<uint16_t>::max()) {
return EINVAL;
}
++m->local_pin_depth;
return 0;
}

static inline int leave_local_pin_scope(TaskMeta* m) {
if (m == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled || m->local_pin_depth == 0) {
return EINVAL;
}
if (--m->local_pin_depth == 0) {
m->local_pin_enabled = false;
m->local_pin_home_group = NULL;
m->local_pin_home_control = NULL;
m->local_pin_home_tag = BTHREAD_TAG_INVALID;
}
return 0;
}

inline TaskControl* get_or_new_task_control() {
butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(butil::memory_order_consume);
Expand Down Expand Up @@ -146,6 +234,11 @@ bthread_t init_for_pthread_stack_trace() {
}

pthread_fake_meta->attr = BTHREAD_ATTR_PTHREAD;
pthread_fake_meta->local_pin_home_group = NULL;
pthread_fake_meta->local_pin_home_control = NULL;
pthread_fake_meta->local_pin_home_tag = BTHREAD_TAG_INVALID;
pthread_fake_meta->local_pin_depth = 0;
pthread_fake_meta->local_pin_enabled = false;
pthread_fake_meta->tid = make_tid(*pthread_fake_meta->version_butex, slot);
// Make TaskTracer use signal trace mode for pthread.
c->_task_tracer.set_running_status(syscall(SYS_gettid), pthread_fake_meta);
Expand Down Expand Up @@ -328,6 +421,55 @@ struct TidJoiner {

extern "C" {

int bthread_register_active_task_type(const bthread_active_task_type_t* type) {
bthread_active_task_type_t normalized;
if (!bthread::normalize_active_task_type(type, &normalized)) {
return EINVAL;
}
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
if (bthread::get_task_control() != NULL) {
return EPERM;
}
BAIDU_SCOPED_LOCK(bthread::g_active_task_registry_mutex);
bthread::g_active_task_types.push_back(normalized);
return 0;
}

int bthread_butex_wake_within(const bthread_active_task_ctx_t* ctx,
void* butex) {
return bthread::TaskGroup::butex_wake_within_active_task(ctx, butex);
}

int bthread_butex_wait_local(void* butex, int expected_value,
const struct timespec* abstime) {
if (butex == NULL) {
errno = EINVAL;
return -1;
}
int err = 0;
bthread::TaskMeta* m = bthread::current_normal_bthread_for_local_pin(&err);
if (m == NULL) {
errno = err;
return -1;
}
bthread::TaskGroup* g = bthread::tls_task_group;
err = bthread::enter_local_pin_scope(m, g);
if (err != 0) {
errno = err;
return -1;
}
const int rc = bthread::butex_wait(butex, expected_value, abstime);
const int saved_errno = errno;
const int leave_err = bthread::leave_local_pin_scope(m);
if (leave_err != 0) {
LOG(ERROR) << "Fail to leave local pin scope after bthread_butex_wait_local";
errno = leave_err;
return -1;
}
errno = saved_errno;
return rc;
}

int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
Expand Down
Loading
Loading